"""
Runs all agent tasks in parallel (up to 10 at a time) using separate subprocesses.
Each task gets its own Python process, preventing browser session interference.
Fails with exit code 1 if 0% of tasks pass.
"""

import argparse
import asyncio
import glob
import json
import logging
import os
import sys
import warnings

import anyio
import yaml
from dotenv import load_dotenv
from pydantic import BaseModel

load_dotenv()
from browser_use import Agent, AgentHistoryList, BrowserProfile, BrowserSession, ChatBrowserUse
from browser_use.llm.google.chat import ChatGoogle
from browser_use.llm.messages import UserMessage

# --- CONFIG ---
MAX_PARALLEL = 10
TASK_DIR = (
	sys.argv[1]
	if len(sys.argv) > 1 and not sys.argv[1].startswith('--')
	else os.path.join(os.path.dirname(__file__), '../agent_tasks')
)
TASK_FILES = glob.glob(os.path.join(TASK_DIR, '*.yaml'))


class JudgeResponse(BaseModel):
	success: bool
	explanation: str


async def run_single_task(task_file):
	"""Run a single task in the current process (called by subprocess)"""
	try:
		print(f'[DEBUG] Starting task: {os.path.basename(task_file)}', file=sys.stderr)

		# Suppress all logging in subprocess to avoid interfering with JSON output
		logging.getLogger().setLevel(logging.CRITICAL)
		for logger_name in ['browser_use', 'telemetry', 'message_manager']:
			logging.getLogger(logger_name).setLevel(logging.CRITICAL)
		warnings.filterwarnings('ignore')

		print('[DEBUG] Loading task file...', file=sys.stderr)
		content = await anyio.Path(task_file).read_text()
		task_data = yaml.safe_load(content)
		task = task_data['task']
		judge_context = task_data.get('judge_context', ['The agent must solve the task'])
		max_steps = task_data.get('max_steps', 15)

		print(f'[DEBUG] Task: {task[:100]}...', file=sys.stderr)
		print(f'[DEBUG] Max steps: {max_steps}', file=sys.stderr)
		api_key = os.getenv('BROWSER_USE_API_KEY')
		if not api_key:
			print('[SKIP] BROWSER_USE_API_KEY is not set - skipping task evaluation', file=sys.stderr)
			return {
				'file': os.path.basename(task_file),
				'success': True,  # Mark as success so it doesn't fail CI
				'explanation': 'Skipped - API key not available (fork PR or missing secret)',
			}

		agent_llm = ChatBrowserUse(api_key=api_key)

		# Check if Google API key is available for judge LLM
		google_api_key = os.getenv('GOOGLE_API_KEY')
		if not google_api_key:
			print('[SKIP] GOOGLE_API_KEY is not set - skipping task evaluation', file=sys.stderr)
			return {
				'file': os.path.basename(task_file),
				'success': True,  # Mark as success so it doesn't fail CI
				'explanation': 'Skipped - Google API key not available (fork PR or missing secret)',
			}

		judge_llm = ChatGoogle(model='gemini-flash-lite-latest')
		print('[DEBUG] LLMs initialized', file=sys.stderr)

		# Each subprocess gets its own profile and session
		print('[DEBUG] Creating browser session...', file=sys.stderr)
		profile = BrowserProfile(
			headless=True,
			user_data_dir=None,
			chromium_sandbox=False,  # Disable sandbox for CI environment (GitHub Actions)
		)
		session = BrowserSession(browser_profile=profile)
		print('[DEBUG] Browser session created', file=sys.stderr)

		# Test if browser is working
		try:
			await session.start()
			from browser_use.browser.events import NavigateToUrlEvent

			event = session.event_bus.dispatch(NavigateToUrlEvent(url='https://httpbin.org/get', new_tab=True))
			await event
			print('[DEBUG] Browser test: navigation successful', file=sys.stderr)
			title = await session.get_current_page_title()
			print(f"[DEBUG] Browser test: got title '{title}'", file=sys.stderr)
		except Exception as browser_error:
			print(f'[DEBUG] Browser test failed: {str(browser_error)}', file=sys.stderr)
			print(
				f'[DEBUG] Browser error type: {type(browser_error).__name__}',
				file=sys.stderr,
			)

		print('[DEBUG] Starting agent execution...', file=sys.stderr)
		agent = Agent(task=task, llm=agent_llm, browser_session=session)

		try:
			history: AgentHistoryList = await agent.run(max_steps=max_steps)
			print('[DEBUG] Agent.run() returned successfully', file=sys.stderr)
		except Exception as agent_error:
			print(
				f'[DEBUG] Agent.run() failed with error: {str(agent_error)}',
				file=sys.stderr,
			)
			print(f'[DEBUG] Error type: {type(agent_error).__name__}', file=sys.stderr)
			# Re-raise to be caught by outer try-catch
			raise agent_error

		agent_output = history.final_result() or ''
		print('[DEBUG] Agent execution completed', file=sys.stderr)

		# Test if LLM is working by making a simple call
		try:
			response = await agent_llm.ainvoke([UserMessage(content="Say 'test'")])
			print(
				f'[DEBUG] LLM test call successful: {response.completion[:50]}',
				file=sys.stderr,
			)
		except Exception as llm_error:
			print(f'[DEBUG] LLM test call failed: {str(llm_error)}', file=sys.stderr)

		# Debug: capture more details about the agent execution
		total_steps = len(history.history) if hasattr(history, 'history') else 0
		last_action = history.history[-1] if hasattr(history, 'history') and history.history else None
		debug_info = f'Steps: {total_steps}, Final result length: {len(agent_output)}'
		if last_action:
			debug_info += f', Last action: {type(last_action).__name__}'

		# Log to stderr so it shows up in GitHub Actions (won't interfere with JSON output to stdout)
		print(f'[DEBUG] Task {os.path.basename(task_file)}: {debug_info}', file=sys.stderr)
		if agent_output:
			print(
				f'[DEBUG] Agent output preview: {agent_output[:200]}...',
				file=sys.stderr,
			)
		else:
			print('[DEBUG] Agent produced no output!', file=sys.stderr)

		criteria = '\n- '.join(judge_context)
		judge_prompt = f"""
You are a evaluator of a browser agent task inside a ci/cd pipeline. Here was the agent's task:
{task}

Here is the agent's output:
{agent_output if agent_output else '[No output provided]'}

Debug info: {debug_info}

Criteria for success:
- {criteria}

Reply in JSON with keys: success (true/false), explanation (string).
If the agent provided no output, explain what might have gone wrong.
"""
		response = await judge_llm.ainvoke([UserMessage(content=judge_prompt)], output_format=JudgeResponse)
		judge_response = response.completion

		result = {
			'file': os.path.basename(task_file),
			'success': judge_response.success,
			'explanation': judge_response.explanation,
		}

		# Clean up session before returning
		await session.kill()

		return result

	except Exception as e:
		# Ensure session cleanup even on error
		try:
			await session.kill()
		except Exception:
			pass

		return {
			'file': os.path.basename(task_file),
			'success': False,
			'explanation': f'Task failed with error: {str(e)}',
		}


async def run_task_subprocess(task_file, semaphore):
	"""Run a task in a separate subprocess"""
	async with semaphore:
		try:
			# Set environment to reduce noise in subprocess
			env = os.environ.copy()
			env['PYTHONPATH'] = os.pathsep.join(sys.path)

			proc = await asyncio.create_subprocess_exec(
				sys.executable,
				__file__,
				'--task',
				task_file,
				stdout=asyncio.subprocess.PIPE,
				stderr=asyncio.subprocess.PIPE,
				env=env,
			)
			stdout, stderr = await proc.communicate()

			if proc.returncode == 0:
				try:
					# Parse JSON result from subprocess
					stdout_text = stdout.decode().strip()
					stderr_text = stderr.decode().strip()

					# Display subprocess debug logs
					if stderr_text:
						print(f'[SUBPROCESS {os.path.basename(task_file)}] Debug output:')
						for line in stderr_text.split('\n'):
							if line.strip():
								print(f'  {line}')

					# Find the JSON line (should be the last line that starts with {)
					lines = stdout_text.split('\n')
					json_line = None
					for line in reversed(lines):
						line = line.strip()
						if line.startswith('{') and line.endswith('}'):
							json_line = line
							break

					if json_line:
						result = json.loads(json_line)
						print(f'[PARENT] Task {os.path.basename(task_file)} completed: {result["success"]}')
					else:
						raise ValueError(f'No JSON found in output: {stdout_text}')

				except (json.JSONDecodeError, ValueError) as e:
					result = {
						'file': os.path.basename(task_file),
						'success': False,
						'explanation': f'Failed to parse subprocess result: {str(e)[:100]}',
					}
					print(f'[PARENT] Task {os.path.basename(task_file)} failed to parse: {str(e)}')
					print(f'[PARENT] Full stdout was: {stdout.decode()[:500]}')
			else:
				stderr_text = stderr.decode().strip()
				result = {
					'file': os.path.basename(task_file),
					'success': False,
					'explanation': f'Subprocess failed (code {proc.returncode}): {stderr_text[:200]}',
				}
				print(f'[PARENT] Task {os.path.basename(task_file)} subprocess failed with code {proc.returncode}')
				if stderr_text:
					print(f'[PARENT] stderr: {stderr_text[:1000]}')
				stdout_text = stdout.decode().strip()
				if stdout_text:
					print(f'[PARENT] stdout: {stdout_text[:1000]}')
		except Exception as e:
			result = {
				'file': os.path.basename(task_file),
				'success': False,
				'explanation': f'Failed to start subprocess: {str(e)}',
			}
			print(f'[PARENT] Failed to start subprocess for {os.path.basename(task_file)}: {str(e)}')

		return result


async def main():
	"""Run all tasks in parallel using subprocesses"""
	semaphore = asyncio.Semaphore(MAX_PARALLEL)

	print(f'Found task files: {TASK_FILES}')

	if not TASK_FILES:
		print('No task files found!')
		return 0, 0

	# Run all tasks in parallel subprocesses
	tasks = [run_task_subprocess(task_file, semaphore) for task_file in TASK_FILES]
	results = await asyncio.gather(*tasks)

	passed = sum(1 for r in results if r['success'])
	total = len(results)

	print('\n' + '=' * 60)
	print(f'{"RESULTS":^60}\n')

	# Prepare table data
	headers = ['Task', 'Success', 'Reason']
	rows = []
	for r in results:
		status = '✅' if r['success'] else '❌'
		rows.append([r['file'], status, r['explanation']])

	# Calculate column widths
	col_widths = [max(len(str(row[i])) for row in ([headers] + rows)) for i in range(3)]

	# Print header
	header_row = ' | '.join(headers[i].ljust(col_widths[i]) for i in range(3))
	print(header_row)
	print('-+-'.join('-' * w for w in col_widths))

	# Print rows
	for row in rows:
		print(' | '.join(str(row[i]).ljust(col_widths[i]) for i in range(3)))

	print('\n' + '=' * 60)
	print(f'\n{"SCORE":^60}')
	print(f'\n{"=" * 60}\n')
	print(f'\n{"*" * 10}  {passed}/{total} PASSED  {"*" * 10}\n')
	print('=' * 60 + '\n')

	# Output results for GitHub Actions
	print(f'PASSED={passed}')
	print(f'TOTAL={total}')

	# Output detailed results as JSON for GitHub Actions
	detailed_results = []
	for r in results:
		detailed_results.append(
			{
				'task': r['file'].replace('.yaml', ''),
				'success': r['success'],
				'reason': r['explanation'],
			}
		)

	print('DETAILED_RESULTS=' + json.dumps(detailed_results))

	return passed, total


if __name__ == '__main__':
	parser = argparse.ArgumentParser()
	parser.add_argument('--task', type=str, help='Path to a single task YAML file (for subprocess mode)')
	args = parser.parse_args()

	if args.task:
		# Subprocess mode: run a single task and output ONLY JSON
		try:
			result = asyncio.run(run_single_task(args.task))
			# Output ONLY the JSON result, nothing else
			print(json.dumps(result))
		except Exception as e:
			# Even on critical failure, output valid JSON
			error_result = {
				'file': os.path.basename(args.task),
				'success': False,
				'explanation': f'Critical subprocess error: {str(e)}',
			}
			print(json.dumps(error_result))
	else:
		# Parent process mode: run all tasks in parallel subprocesses
		passed, total = asyncio.run(main())
		# Results already printed by main() function

		# Fail if 0% pass rate (all tasks failed)
		if total > 0 and passed == 0:
			print('\n❌ CRITICAL: 0% pass rate - all tasks failed!')
			sys.exit(1)
